Source code for nlp_architect.solutions.trend_analysis.np_scorer

# ******************************************************************************
# Copyright 2017-2018 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ******************************************************************************
import logging
from os import path, makedirs

from tqdm import tqdm

from nlp_architect.pipelines.spacy_np_annotator import NPAnnotator, get_noun_phrases
from nlp_architect.solutions.trend_analysis.scoring_utils import TextSpanScoring
from nlp_architect import LIBRARY_OUT
from nlp_architect.utils.io import download_unlicensed_file
from nlp_architect.utils.text import SpacyInstance

nlp_chunker_url = 'https://s3-us-west-2.amazonaws.com/nlp-architect-data/models/chunker/'
chunker_model_dat_file = 'model_info.dat.params'
chunker_model_file = 'model.h5'
chunker_local_path = str(LIBRARY_OUT / 'chunker-pretrained')
logger = logging.getLogger(__name__)


[docs]class NPScorer(object): def __init__(self, parser=None): if parser is None: self.nlp = SpacyInstance( disable=['ner', 'parser', 'vectors', 'textcat']).parser else: self.nlp = parser self.nlp.add_pipe(self.nlp.create_pipe('sentencizer'), first=True) _path_to_model = path.join(chunker_local_path, chunker_model_file) if not path.exists(chunker_local_path): makedirs(chunker_local_path) if not path.exists(_path_to_model): logger.info( 'The pre-trained model to be downloaded for NLP Architect word' ' chunker model is licensed under Apache 2.0') download_unlicensed_file(nlp_chunker_url, chunker_model_file, _path_to_model) _path_to_params = path.join(chunker_local_path, chunker_model_dat_file) if not path.exists(_path_to_params): download_unlicensed_file(nlp_chunker_url, chunker_model_dat_file, _path_to_params) self.nlp.add_pipe(NPAnnotator.load(_path_to_model, _path_to_params), last=True)
[docs] def score_documents(self, texts: list, limit=-1, return_all=False, min_tf=5): documents = [] assert len(texts) > 0, 'texts should contain at least 1 document' assert min_tf > 0, 'min_tf should be at least 1' with tqdm(total=len(texts), desc='documents scoring progress', unit='docs') as pbar: for doc in self.nlp.pipe(texts, n_threads=-1): if len(doc) > 0: documents.append(doc) pbar.update(1) corpus = [] for doc in documents: spans = get_noun_phrases(doc) if len(spans) > 0: corpus.append((doc, spans)) if len(corpus) < 1: return [] documents, doc_phrases = list(zip(*corpus)) scorer = TextSpanScoring(documents=documents, spans=doc_phrases, min_tf=min_tf) tfidf_scored_list = scorer.get_tfidf_scores() if len(tfidf_scored_list) < 1: return [] cvalue_scored_list = scorer.get_cvalue_scores() freq_scored_list = scorer.get_freq_scores() if limit > 0: tf = {tuple(k[0]): k[1] for k in tfidf_scored_list} cv = {tuple(k[0]): k[1] for k in cvalue_scored_list} fr = {tuple(k[0]): k[1] for k in freq_scored_list} tfidf_scored_list_limit = [] cvalue_scored_list_limit = [] freq_scored_list_limit = [] for phrase in list(zip(*tfidf_scored_list))[0][:limit]: tfidf_scored_list_limit.append((phrase, tf[tuple(phrase)])) cvalue_scored_list_limit.append((phrase, cv[tuple(phrase)])) freq_scored_list_limit.append((phrase, fr[tuple(phrase)])) tfidf_scored_list = tfidf_scored_list_limit cvalue_scored_list = cvalue_scored_list_limit freq_scored_list = freq_scored_list_limit tfidf_scored_list = scorer.normalize_l2(tfidf_scored_list) cvalue_scored_list = scorer.normalize_l2(cvalue_scored_list) freq_scored_list = scorer.normalize_minmax(freq_scored_list, invert=True) tfidf_scored_list = scorer.normalize_minmax(tfidf_scored_list) cvalue_scored_list = scorer.normalize_minmax(cvalue_scored_list) if return_all: tf = {tuple(k[0]): k[1] for k in tfidf_scored_list} cv = {tuple(k[0]): k[1] for k in cvalue_scored_list} fr = {tuple(k[0]): k[1] for k in freq_scored_list} final_list = [] for phrases in tf.keys(): final_list.append(([p for p in phrases], tf[phrases], cv[phrases], fr[phrases])) return final_list merged_list = scorer.interpolate_scores([tfidf_scored_list, cvalue_scored_list], [0.5, 0.5]) merged_list = scorer.multiply_scores([merged_list, freq_scored_list]) merged_list = scorer.normalize_minmax(merged_list) final_list = [] for phrases, score in merged_list: if any([len(p) > 1 for p in phrases]): final_list.append(([p for p in phrases], score)) return final_list